#!/usr/bin/env bash

# BEGIN_COPYRIGHT
#
# Copyright 2009-2018 CRS4.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# END_COPYRIGHT

set -euo pipefail
[ -n "${DEBUG:-}" ] && set -x
this="${BASH_SOURCE-$0}"
this_dir=$(cd -P -- "$(dirname -- "${this}")" && pwd -P)
. "${this_dir}/../../config.sh"

PARQUET_JAR="${this_dir}"/../java/target/ParquetMR-assembly-0.1.jar
[ -f ${PARQUET_JAR} ] || build_parquet_jar "${this_dir}"/../java
IN_SCHEMA_FILE_LOCAL="${this_dir}"/../schemas/user.avsc
IN_SCHEMA_FILE_HDFS=user.avsc
OUT_SCHEMA_FILE_LOCAL="${this_dir}"/../schemas/stats.avsc
OUT_SCHEMA_FILE_HDFS=stats.avsc

# --- create input ---
CSV_INPUT=$(mktemp -d)
INPUT=$(basename ${CSV_INPUT})
PARQUETS_DIR=parquets
N=20
for i in 1 2; do
    ${PYTHON} create_input.py ${N} "${CSV_INPUT}/users_${i}.csv"
done

# --- convert to avro-parquet ---
${HADOOP} fs -mkdir -p /user/"${USER}"
${HADOOP} fs -rm -r /user/"${USER}"/"${PARQUETS_DIR}" || :
${HADOOP} fs -rm -r "${INPUT}" || :
${HADOOP} fs -put "${CSV_INPUT}" "${INPUT}"

${HADOOP} fs -put -f "${IN_SCHEMA_FILE_LOCAL}" "${IN_SCHEMA_FILE_HDFS}"
hadoop jar "${PARQUET_JAR}" it.crs4.pydoop.WriteParquet \
    "${INPUT}" "${PARQUETS_DIR}" "${IN_SCHEMA_FILE_HDFS}"

# --- run color count ---
MODULE=avro_value_in_out
MPY="${MODULE}".py
JOBNAME="${MODULE}"-job
LOGLEVEL="DEBUG"
STATS_SCHEMA=$(cat "${OUT_SCHEMA_FILE_LOCAL}")
INPUT_FORMAT=org.apache.parquet.avro.AvroParquetInputFormat
OUTPUT_FORMAT=org.apache.parquet.avro.AvroParquetOutputFormat
CC_OUTPUT=cc_output

${HADOOP} fs -rm -r /user/"${USER}"/"${CC_OUTPUT}" || :

${PYDOOP} submit \
    -D pydoop.mapreduce.avro.value.output.schema="${STATS_SCHEMA}" \
    -D parquet.avro.schema="${STATS_SCHEMA}" \
    --upload-file-to-cache avro_base.py \
    --upload-file-to-cache "${MPY}" \
    --num-reducers 1 \
    --input-format "${INPUT_FORMAT}" \
    --output-format "${OUTPUT_FORMAT}" \
    --avro-input v \
    --avro-output v \
    --libjars "${PARQUET_JAR}" \
    --log-level "${LOGLEVEL}" \
    --job-name "${JOBNAME}" \
    "${MODULE}" "${PARQUETS_DIR}" "${CC_OUTPUT}"

# --- dump results ---
MODULE=avro_parquet_dump_results
MPY="${MODULE}".py
JOBNAME="${MODULE}"-job
LOGLEVEL="DEBUG"
STATS_SCHEMA=$(cat "${OUT_SCHEMA_FILE_LOCAL}")
INPUT_FORMAT=org.apache.parquet.avro.AvroParquetInputFormat
OUTPUT=results

${HADOOP} fs -rm -r /user/"${USER}"/"${OUTPUT}" || :
${PYDOOP} submit \
    --upload-file-to-cache "${MPY}" \
    --num-reducers 0 \
    --input-format "${INPUT_FORMAT}" \
    --avro-input v \
    --libjars "${PARQUET_JAR}" \
    --log-level "${LOGLEVEL}" \
    --job-name "${JOBNAME}" \
    "${MODULE}" "${CC_OUTPUT}" "${OUTPUT}"

# --- check results ---
rm -rf "${OUTPUT}"
${HADOOP} fs -get /user/"${USER}"/"${OUTPUT}"
${PYTHON} check_results.py "${CSV_INPUT}" "${OUTPUT}"

rm -rf "${CSV_INPUT}" "${OUTPUT}"
